Last Updated: 2025-11-19
Scope: SQL Mode rollout, Forecast/RUL roadmap, charting/documentation polish, and technical debt items carried over from the prior To-Do documents.
core/forecasting.py modulefrom core import forecasting*_deprecated.py for referenceTask Backlog.md now replaces the old Task Backlog.md. The two lists diverged and became hard to maintain, so they have been consolidated here. All day-to-day work should now be tracked in GitHub Issues instead of ad-hoc markdown tables. This file summarizes the active workstreams, how to label issues, and what is still genuinely outstanding after auditing the old backlog entries.
AREA-ID: short description (e.g., SQL-12: Complete dual-write run matrix).SQL, Forecast, RUL, Charts, Docs, TechDebt, and priority labels (P0, P1, P2).The old
Task Backlog.mdfile has been removed. If you come across references to it, update them to point here or directly to the GitHub Issues board.
The sections below list the still-open themes from the merged backlogs. Each bullet either already has an associated issue or needs one created.
validate_dual_write.py to compare CSV vs SQL outputs automatically.scores.csv dependency so forecasting works in SQL-only runs.ACM_SensorForecast_TS with diagnostics.rul_estimator.py consume SQL health timelines instead of CSVs.core/acm_main.py and ensure error truncation captures full stacks.Use the following mapping when opening/triaging GitHub issues. Strike through entries if they are already covered by an existing issue or PR.
| Legacy ID | Area | Status | Action |
|---|---|---|---|
| SQL-12 | SQL Integration | Planned | Create/maintain issue covering the remaining dual-write validation matrix. |
| SQL-13 | SQL Integration | Planned | Issue for the CSV vs SQL comparison tool. |
| SQL-14 | SQL Integration | Planned | Issue for parity validation automation. |
| SQL-15 | SQL Integration | Planned | Issue for benchmarking SQL write times. |
| SQL-20/21/22/23 | Model Persistence | Planned | Either one umbrella issue or separate ones for save/load + integration + testing. |
| FCST-15/16 | Forecast | Planned | Two issues tracking SQL-only forecast support and sensor forecasts. |
| RUL-01/02 | RUL | Planned | Two issues covering SQL ingestion + probabilistic outputs. |
| CHART-19/20 | Charts/Docs | Planned | Open documentation + tooling issues. |
| DEBT-07/14/15 | Tech Debt | Planned | Create issues for structured error handling and test hooks. |
All other historical tasks were either completed (already struck through in previous versions) or removed because they duplicated the finished logging/chart work. If you discover a missing item while working through the code, open an issue instead of reintroducing a markdown TODO.
Task Backlog.md and Task Backlog.md documents were audited on 2025-11-16. Duplicates were removed, chart/CSV-only tasks were archived, and completed logging upgrades were marked done.CHART-03/04/13/15/16/17/18, OUT-03/05/20/21/24/26/27/28, DEBT-04, CFG-07, SQL-57, and all logging upgrades are confirmed complete.For everything else, the single source of truth is now GitHub Issues. This document will be updated only when the set of workstreams changes (e.g., adding a new audit area or closing a pillar entirely).
Pending SQL-Related Tasks 🔴 HIGH PRIORITY - Empty Table Implementations
MEDIUM PRIORITY - Enhanced Feature Tables (Future Enhancements)
SQL INTEGRATION TASKS (Phase 3: Pure SQL Operation) SQL-45: Remove CSV Output Writes ⏳ PENDING Objective: Eliminate all CSV file writes, keep SQL-only Current State: Dual-write active (both CSV and SQL) Changes Required: Remove write_dataframe() CSV writes from output_manager.py Keep SQL table writes only (ALLOWED_TABLES whitelist) Remove scores.csv, episodes.csv, metrics.csv exports Keep: Charts/PNG generation (visual outputs separate from storage) Impact: artifacts directory will only contain charts, no data CSVs Estimated Effort: 3-4 hours Priority: MEDIUM (system working, cleanup for production) SQL-46: Eliminate Model Filesystem Persistence ⏳ PENDING Objective: Remove .joblib file writes, use SQL ModelRegistry only Current State: Models saved as .joblib files in artifacts/{equip}/models/ Changes Required: Remove filesystem save/load from model_persistence.py Keep SQL ModelRegistry writes only Remove stable_models_dir fallback logic Remove .joblib file writes Impact: No model files in filesystem, all models in SQL Estimated Effort: 4-6 hours Priority: MEDIUM (covered by SQL-20/21/22/23 below) Related: SQL-20/21/22/23 (ModelRegistry save/load) SQL-50: End-to-End Pure SQL Validation ⏳ PENDING Objective: Validate complete SQL-only operation for 30+ days Validation Steps: Run full pipeline with storage_backend='sql' Verify: No files created in artifacts (except charts) Verify: All results in SQL tables only Confirm: Pipeline runs successfully start-to-finish Performance: SQL write time <15s per run Stability: 30+ days unattended operation Estimated Effort: Ongoing validation (2-4 weeks monitoring) Priority: MEDIUM (system working, formal validation needed)
VALIDATION & TESTING TASKS SQL-12: Complete Dual-Write Validation Matrix ⏳ 6/10 REMAINING Objective: Validate CSV vs SQL output parity across all tables Current State: 4/10 validation runs completed Remaining: 6 more validation runs with different equipment/time windows Estimated Effort: 3-4 hours (1 hour per equipment run + analysis) Priority: HIGH (quality assurance) SQL-13: Build validate_dual_write.py Comparison Tool 🆕 PLANNED Objective: Automated CSV vs SQL comparison for all 40+ tables Current State: No tool exists Implementation: Compare row counts between CSV and SQL Compare column values (floating point tolerance) Report discrepancies automatically Estimated Effort: 4-6 hours Priority: HIGH (prevents manual validation burden) SQL-14: Row-Count/Value Parity Checks 🆕 PLANNED Objective: Automated parity validation as part of CI/test suite Current State: Manual validation only Implementation: pytest fixtures that compare outputs Estimated Effort: 3-5 hours Priority: MEDIUM SQL-15: Baseline SQL Write Performance 🆕 PLANNED Objective: Benchmark SQL write times (<15s target per run) Current State: No formal benchmarks captured Implementation: Add timing instrumentation to OutputManager Estimated Effort: 2-3 hours Priority: MEDIUM
MODEL PERSISTENCE TO SQL SQL-20: Save Detectors to ModelRegistry 🆕 PLANNED Objective: Serialize detector models to SQL ModelRegistry table Current State: ModelRegistry table exists, no save logic Implementation: model_persistence.py serialize to binary + metadata Estimated Effort: 5-7 hours Priority: MEDIUM Blocked By: None SQL-21: Load Detectors from ModelRegistry 🆕 PLANNED Objective: Deserialize detector models from SQL Current State: No load logic implemented Implementation: model_persistence.py fetch + deserialize Estimated Effort: 4-6 hours Priority: MEDIUM Depends On: SQL-20 SQL-22: Wire ModelRegistry into Training Pipeline 🆕 PLANNED Objective: Replace .joblib file save/load with SQL calls Current State: Pipeline uses filesystem Implementation: Update acm_main.py training/loading logic Estimated Effort: 3-5 hours Priority: MEDIUM Depends On: SQL-20, SQL-21 SQL-23: Test ModelRegistry End-to-End 🆕 PLANNED Objective: Validate model persistence across runs Implementation: pytest fixtures validating save → load → predict cycle Estimated Effort: 3-4 hours Priority: MEDIUM Depends On: SQL-22
FORECAST & RUL ENHANCEMENTS FCST-15: Remove scores.csv Dependency ⏳ PENDING Objective: Forecasting works in SQL-only runs (no CSV dependency) Current State: forecast.py may still read scores.csv Implementation: Ensure forecast reads from SQL tables only Estimated Effort: 2-3 hours Priority: HIGH (blocks SQL-only operation) FCST-16: Per-Sensor/Regime Forecasts to ACM_SensorForecast_TS 🆕 PLANNED Objective: Publish sensor-level forecasts with regime breakdown Current State: Only equipment-level forecasts published Implementation: Expand forecast logic, write to ACM_SensorForecast_TS Estimated Effort: 6-8 hours Priority: MEDIUM RUL-01: SQL-Based RUL Estimation ⏳ PENDING Objective: rul_estimator.py consumes SQL health timelines instead of CSVs Current State: May still read from CSV files Implementation: Update RUL data loading to query ACM_HealthTimeline Estimated Effort: 3-4 hours Priority: HIGH (blocks SQL-only operation) RUL-02: Probabilistic RUL Bands ⏳ PENDING Objective: Add P10/P50/P90 RUL confidence intervals Current State: Single-point RUL estimates only Implementation: Add probabilistic modeling to rul_estimator.py Estimated Effort: 8-12 hours Priority: MEDIUM
TECHNICAL DEBT DEBT-07: Tighten Error Handling ⏳ PENDING Objective: Robust error handling around detector training and SQL IO Current State: Some unhandled exceptions possible Implementation: Add try-except blocks with proper logging Estimated Effort: 4-6 hours Priority: MEDIUM DEBT-14/15: Test Hooks & Path Handling ⏳ PENDING Objective: Improve test fixtures and error stack truncation Current State: Some test paths hard-coded Implementation: Refactor acm_main.py test support Estimated Effort: 3-5 hours Priority: LOW
SUMMARY BY PRIORITY
CRITICAL (Implement Now) ACM_DataQuality writes (2-4 hrs) - Operations monitoring ACM_RecommendedActions writes (3-5 hrs) - Operator decision support
HIGH PRIORITY (Next Sprint) SQL-12: Complete dual-write validation (3-4 hrs) SQL-13: Build validate_dual_write.py tool (4-6 hrs) FCST-15: Remove scores.csv dependency (2-3 hrs) RUL-01: SQL-based RUL estimation (3-4 hrs)
MEDIUM PRIORITY (Following Sprint) ACM_Scores_Wide writes (2-3 hrs) - Nice-to-have pivot table SQL-14/15: Parity checks + performance benchmarking (5-8 hrs) SQL-20/21/22/23: ModelRegistry implementation (15-22 hrs total) SQL-45/46/50: Pure SQL operation cleanup (7-14 hrs) FCST-16, RUL-02: Enhanced forecasting/RUL (14-20 hrs) DEBT-07: Error handling improvements (4-6 hrs) 🔮 FUTURE ENHANCEMENTS (Backlog) ACM_FailureCausation (8-12 hrs) ACM_EnhancedFailureProbability_TS (6-10 hrs) ACM_EnhancedMaintenanceRecommendation (5-8 hrs) DEBT-14/15: Test infrastructure (3-5 hrs)
📊 EFFORT ESTIMATE TOTALS Critical: 5-9 hours High Priority: 12-17 hours Medium Priority: 43-73 hours Future Enhancements: 22-35 hours Total Pending SQL Work: ~82-134 hours (10-17 developer days)
Current architecture treats each batch in isolation:
User Requirements:
File: core/model_persistence.py
Changes:
@dataclass
class ForecastState:
"""Persistent state for continuous forecasting between batches."""
equip_id: int
state_version: int
model_type: str # "AR1", "ARIMA", "ETS"
model_params: Dict[str, Any] # {phi, mu, sigma} for AR1
residual_variance: float
last_forecast_horizon: pd.DataFrame # Timestamp, ForecastHealth, CI_Lower, CI_Upper
hazard_baseline: float # EWMA smoothed hazard rate
last_retrain_time: datetime
training_data_hash: str
training_window_hours: int
forecast_quality: Dict[str, float] # {rmse, mae, mape}
def save_forecast_state(state: ForecastState, artifact_root: Path, equip: str) -> None:
"""Serialize ForecastState to JSON in artifacts/{equip}/models/forecast_state.json"""
def load_forecast_state(artifact_root: Path, equip: str) -> Optional[ForecastState]:
"""Deserialize ForecastState from JSON. Returns None if not found."""
SQL Table: ACM_ForecastState
CREATE TABLE ACM_ForecastState (
EquipID INT NOT NULL,
StateVersion INT NOT NULL,
ModelType NVARCHAR(50),
ModelParamsJson NVARCHAR(MAX), -- JSON serialized params
ResidualVariance FLOAT,
LastForecastHorizonJson NVARCHAR(MAX), -- JSON array of forecast points
HazardBaseline FLOAT,
LastRetrainTime DATETIME2,
TrainingDataHash NVARCHAR(64),
TrainingWindowHours INT,
ForecastQualityJson NVARCHAR(MAX), -- {rmse, mae, mape}
CreatedAt DATETIME2 DEFAULT GETDATE(),
PRIMARY KEY (EquipID, StateVersion)
)
Effort: 4-6 hours
Files Modified: core/model_persistence.py, core/acm_main.py
File: core/forecasting.py
Function: run_enhanced_forecasting_sql()
Changes:
prev_state = load_forecast_state(artifact_root, equip)
if prev_state:
Console.info(f"[FORECAST] Loaded state v{prev_state.state_version}, last retrain {prev_state.last_retrain_time}")
# OLD: use only current batch health data
# NEW: combine last 72h from ACM_HealthTimeline + current batch
lookback_hours = 72
cutoff_time = current_batch_start - timedelta(hours=lookback_hours)
df_health_combined = load_health_timeline(sql_client, equip_id, since=cutoff_time)
def should_retrain(prev_state, current_metrics, config) -> Tuple[bool, str]:
"""
Decide if full retrain needed.
Returns: (retrain_needed, reason)
"""
drift_threshold = config.get("forecasting", {}).get("drift_retrain_threshold", 1.5)
energy_threshold = config.get("forecasting", {}).get("energy_spike_threshold", 1.5)
error_threshold = config.get("forecasting", {}).get("forecast_error_threshold", 2.0)
# Check drift
drift_recent = get_recent_drift_metrics(sql_client, equip_id, window_hours=6)
if drift_recent["DriftValue"].mean() > drift_threshold:
return True, f"Drift exceeded {drift_threshold}"
# Check anomaly energy spike
energy_p95 = current_metrics.get("anomaly_energy_p95", 0)
energy_median = current_metrics.get("anomaly_energy_median", 1)
if energy_p95 > energy_threshold * energy_median:
return True, f"Energy spike {energy_p95:.2f} > {energy_threshold}x median"
# Check forecast quality degradation
if prev_state and prev_state.forecast_quality:
prev_rmse = prev_state.forecast_quality.get("rmse", 0)
current_rmse = current_metrics.get("forecast_rmse", 0)
if prev_rmse > 0 and current_rmse > error_threshold * prev_rmse:
return True, f"Forecast error {current_rmse:.2f} > {error_threshold}x baseline"
return False, "Model stable, incremental update"
# In main forecast logic:
retrain_needed, retrain_reason = should_retrain(prev_state, current_metrics, config)
if not retrain_needed and prev_state:
# Incremental update: use existing model params, extend training window
model_params = prev_state.model_params
Console.info(f"[FORECAST] Incremental update: {retrain_reason}")
else:
# Full retrain
model_params = train_forecast_model(df_health_combined)
Console.info(f"[FORECAST] Full retrain: {retrain_reason}")
def merge_forecast_horizons(
prev_horizon: pd.DataFrame, # columns: Timestamp, ForecastHealth, CI_Lower, CI_Upper
new_horizon: pd.DataFrame,
current_time: datetime,
blend_tau_hours: float = 12.0
) -> pd.DataFrame:
"""
Merge overlapping forecast horizons with exponential decay weighting.
Logic:
- Keep all past points (Timestamp < current_time)
- For overlapping future points, blend: w_new = 1 - exp(-dt/tau), w_prev = exp(-dt/tau)
- Append non-overlapping new points
"""
if prev_horizon.empty:
return new_horizon
# Filter to future points only
prev_future = prev_horizon[prev_horizon["Timestamp"] >= current_time].copy()
new_future = new_horizon[new_horizon["Timestamp"] >= current_time].copy()
if prev_future.empty:
return new_horizon
# Merge on timestamp
merged = pd.merge(
prev_future, new_future,
on="Timestamp", how="outer", suffixes=("_prev", "_new")
).sort_values("Timestamp")
# Calculate blend weights
dt_hours = (merged["Timestamp"] - current_time).dt.total_seconds() / 3600
w_new = 1 - np.exp(-dt_hours / blend_tau_hours)
w_prev = np.exp(-dt_hours / blend_tau_hours)
# Blend values
merged["ForecastHealth"] = (
merged["ForecastHealth_new"].fillna(0) * w_new +
merged["ForecastHealth_prev"].fillna(0) * w_prev
)
merged["CI_Lower"] = (
merged["CI_Lower_new"].fillna(0) * w_new +
merged["CI_Lower_prev"].fillna(0) * w_prev
)
merged["CI_Upper"] = (
merged["CI_Upper_new"].fillna(0) * w_new +
merged["CI_Upper_prev"].fillna(0) * w_prev
)
return merged[["Timestamp", "ForecastHealth", "CI_Lower", "CI_Upper"]]
# In main forecast logic:
merged_horizon = merge_forecast_horizons(
prev_state.last_forecast_horizon if prev_state else pd.DataFrame(),
new_forecast_df,
current_batch_time,
blend_tau_hours=config.get("forecasting", {}).get("blend_tau_hours", 12.0)
)
new_state = ForecastState(
equip_id=equip_id,
state_version=(prev_state.state_version + 1) if prev_state else 1,
model_type="AR1",
model_params=model_params,
residual_variance=residual_variance,
last_forecast_horizon=merged_horizon,
hazard_baseline=smoothed_hazard, # see FORECAST-STATE-03
last_retrain_time=datetime.now() if retrain_needed else prev_state.last_retrain_time,
training_data_hash=compute_hash(df_health_combined),
training_window_hours=lookback_hours,
forecast_quality={"rmse": rmse, "mae": mae, "mape": mape}
)
save_forecast_state(new_state, artifact_root, equip)
Effort: 10-14 hours
Files Modified: core/forecasting.py, core/acm_main.py
File: core/forecasting.py
New Function: smooth_failure_probability_hazard()
Implementation:
def smooth_failure_probability_hazard(
prev_hazard_baseline: float,
new_probability_series: pd.Series, # Index: Timestamp, Values: discrete batch probabilities
dt_hours: float = 1.0,
alpha: float = 0.3
) -> pd.DataFrame:
"""
Convert discrete batch probabilities to continuous hazard with EWMA smoothing.
Math:
- Hazard rate: lambda(t) = -ln(1 - p(t)) / dt
- EWMA smoothing: lambda_smooth[t] = alpha * lambda_raw[t] + (1-alpha) * lambda_smooth[t-1]
- Survival probability: S(t) = exp(-integral_0^t lambda_smooth(u) du)
- Failure probability: F(t) = 1 - S(t)
Returns: DataFrame with columns [Timestamp, HazardRaw, HazardSmooth, Survival, FailureProb]
"""
df_result = pd.DataFrame(index=new_probability_series.index)
# Convert probability to hazard rate
p_clipped = new_probability_series.clip(1e-9, 1 - 1e-9) # Avoid log(0)
lambda_raw = -np.log(1 - p_clipped) / dt_hours
df_result["HazardRaw"] = lambda_raw
# EWMA smoothing
lambda_smooth = np.zeros(len(lambda_raw))
lambda_smooth[0] = alpha * lambda_raw.iloc[0] + (1 - alpha) * prev_hazard_baseline
for i in range(1, len(lambda_raw)):
lambda_smooth[i] = alpha * lambda_raw.iloc[i] + (1 - alpha) * lambda_smooth[i-1]
df_result["HazardSmooth"] = lambda_smooth
# Compute survival and failure probability
cumulative_hazard = np.cumsum(lambda_smooth * dt_hours)
df_result["Survival"] = np.exp(-cumulative_hazard)
df_result["FailureProb"] = 1 - df_result["Survival"]
df_result["Timestamp"] = df_result.index
return df_result.reset_index(drop=True)
# In run_enhanced_forecasting_sql():
df_hazard = smooth_failure_probability_hazard(
prev_hazard_baseline=prev_state.hazard_baseline if prev_state else 0.0,
new_probability_series=failure_probs_df.set_index("Timestamp")["FailureProbability"],
dt_hours=1.0,
alpha=config.get("forecasting", {}).get("hazard_smoothing_alpha", 0.3)
)
# Write to ACM_FailureHazard_TS
output_manager.write_dataframe(
df_hazard,
tables_dir / "failure_hazard.csv",
sql_table="ACM_FailureHazard_TS",
add_created_at=True
)
SQL Table: ACM_FailureHazard_TS
CREATE TABLE ACM_FailureHazard_TS (
Timestamp DATETIME2 NOT NULL,
HazardRaw FLOAT,
HazardSmooth FLOAT,
Survival FLOAT,
FailureProb FLOAT,
RunID INT,
EquipID INT,
CreatedAt DATETIME2 DEFAULT GETDATE(),
PRIMARY KEY (EquipID, RunID, Timestamp)
)
Effort: 6-8 hours
Files Modified: core/forecasting.py
File: core/enhanced_rul_estimator.py
Function: estimate_rul_and_failure()
Changes:
def compute_rul_multipath(
health_forecast: pd.DataFrame, # Timestamp, ForecastHealth, CI_Lower, CI_Upper
hazard_df: pd.DataFrame, # Timestamp, FailureProb
anomaly_energy_df: pd.DataFrame, # Timestamp, CumulativeEnergy
current_time: datetime,
config: Dict[str, Any]
) -> Dict[str, Any]:
"""
Compute RUL via three independent paths, take minimum.
Path 1 (Trajectory): Find first Timestamp where ForecastHealth <= threshold
Path 2 (Hazard): Find first Timestamp where FailureProb >= 0.5
Path 3 (Energy): Find first Timestamp where CumulativeEnergy >= E_fail (calibrated)
Returns: {
"rul_trajectory_hours": float,
"rul_hazard_hours": float,
"rul_energy_hours": float,
"rul_final_hours": float, # min(t1, t2, t3)
"confidence_band_hours": float, # CI_Upper crossing - CI_Lower crossing
"dominant_path": str # "trajectory" | "hazard" | "energy"
}
"""
health_threshold = config.get("forecasting", {}).get("failure_threshold", 75.0)
energy_fail_threshold = config.get("forecasting", {}).get("energy_fail_threshold", 1000.0)
# Path 1: Trajectory crossing
trajectory_crossing = health_forecast[health_forecast["ForecastHealth"] <= health_threshold]
if not trajectory_crossing.empty:
t1 = trajectory_crossing.iloc[0]["Timestamp"]
rul_trajectory = (t1 - current_time).total_seconds() / 3600
else:
rul_trajectory = np.inf
# Path 2: Hazard accumulation
hazard_crossing = hazard_df[hazard_df["FailureProb"] >= 0.5]
if not hazard_crossing.empty:
t2 = hazard_crossing.iloc[0]["Timestamp"]
rul_hazard = (t2 - current_time).total_seconds() / 3600
else:
rul_hazard = np.inf
# Path 3: Anomaly energy threshold
energy_crossing = anomaly_energy_df[anomaly_energy_df["CumulativeEnergy"] >= energy_fail_threshold]
if not energy_crossing.empty:
t3 = energy_crossing.iloc[0]["Timestamp"]
rul_energy = (t3 - current_time).total_seconds() / 3600
else:
rul_energy = np.inf
# Final RUL = minimum
rul_final = min(rul_trajectory, rul_hazard, rul_energy)
if rul_final == np.inf:
rul_final = config.get("forecasting", {}).get("max_forecast_hours", 168.0)
# Confidence band from trajectory CI
ci_lower_crossing = health_forecast[health_forecast["CI_Lower"] <= health_threshold]
ci_upper_crossing = health_forecast[health_forecast["CI_Upper"] <= health_threshold]
if not ci_lower_crossing.empty and not ci_upper_crossing.empty:
t_lower = (ci_lower_crossing.iloc[0]["Timestamp"] - current_time).total_seconds() / 3600
t_upper = (ci_upper_crossing.iloc[0]["Timestamp"] - current_time).total_seconds() / 3600
confidence_band = abs(t_upper - t_lower)
else:
confidence_band = 0.0
# Determine dominant path
if rul_final == rul_trajectory:
dominant_path = "trajectory"
elif rul_final == rul_hazard:
dominant_path = "hazard"
else:
dominant_path = "energy"
return {
"rul_trajectory_hours": rul_trajectory if rul_trajectory != np.inf else None,
"rul_hazard_hours": rul_hazard if rul_hazard != np.inf else None,
"rul_energy_hours": rul_energy if rul_energy != np.inf else None,
"rul_final_hours": rul_final,
"confidence_band_hours": confidence_band,
"dominant_path": dominant_path
}
# Update ACM_RUL_Summary schema to include new columns
SQL Table Update: ACM_RUL_Summary
ALTER TABLE ACM_RUL_Summary ADD
RUL_Trajectory_Hours FLOAT,
RUL_Hazard_Hours FLOAT,
RUL_Energy_Hours FLOAT,
RUL_Final_Hours FLOAT,
ConfidenceBand_Hours FLOAT,
DominantPath NVARCHAR(20)
Effort: 8-10 hours
Files Modified: core/enhanced_rul_estimator.py, core/forecasting.py
Files: docs/RUL_METHOD.md (new), scripts/evaluate_rul_backtest.py, dashboard Panel 36
Definition:
FAILURE CONDITION (Unified):
A failure event is detected when ANY of the following occurs:
1. SUSTAINED LOW HEALTH: HealthIndex < 75 for >= 4 consecutive hours (>=4 data points @ 1h freq)
2. CRITICAL EPISODE: Episode with Severity='CRITICAL' logged in ACM_CulpritHistory
3. ACUTE ANOMALY: FusedZ >= 3.0 for >= 2 consecutive hours (>=2 data points @ 1h freq)
OPTIONAL PRE-FAILURE MARKER (Early Warning):
- DriftValue > 1.5 AND anomaly_energy_slope > threshold (indicates degradation trend)
RATIONALE:
- Condition 1: Captures gradual degradation (slow health decline)
- Condition 2: Captures known critical events (episode detection)
- Condition 3: Captures sudden acute failures (sensor spikes)
Implementation in evaluate_rul_backtest.py:
def identify_unified_failures(
sql_client,
equip_id: int,
health_threshold: float = 75.0,
health_sustain_hours: int = 4,
fused_z_threshold: float = 3.0,
fused_z_sustain_hours: int = 2
) -> pd.DataFrame:
"""
Identify failure events using unified condition.
Returns DataFrame: [FailureTime, FailureType, Severity]
"""
failures = []
# Condition 1: Sustained low health
health_failures = identify_health_threshold_failures(
sql_client, equip_id, health_threshold, health_sustain_hours
)
for ts in health_failures:
failures.append({"FailureTime": ts, "FailureType": "SUSTAINED_LOW_HEALTH", "Severity": "HIGH"})
# Condition 2: Critical episodes
critical_episodes = fetch_critical_episodes(sql_client, equip_id)
for ts in critical_episodes:
failures.append({"FailureTime": ts, "FailureType": "CRITICAL_EPISODE", "Severity": "CRITICAL"})
# Condition 3: Acute FusedZ spikes
fused_z_failures = identify_fused_z_spikes(
sql_client, equip_id, fused_z_threshold, fused_z_sustain_hours
)
for ts in fused_z_failures:
failures.append({"FailureTime": ts, "FailureType": "ACUTE_ANOMALY", "Severity": "HIGH"})
df_failures = pd.DataFrame(failures)
if df_failures.empty:
return df_failures
# Remove duplicates within 24h window (same failure event)
df_failures = df_failures.sort_values("FailureTime")
df_failures = df_failures[
df_failures["FailureTime"].diff().dt.total_seconds() / 3600 > 24
]
return df_failures
Dashboard Update (Panel 36): Add markdown section explaining failure condition in plain language.
Effort: 4-5 hours
Files Modified: scripts/evaluate_rul_backtest.py, grafana_dashboards/asset_health_dashboard.json, docs/RUL_METHOD.md (new)
Grafana Panel: New panel "Predicted Defect Signature"
Query:
-- Show detector contribution breakdown in forecast window
WITH LatestRun AS (
SELECT MAX(RunID) AS RunID
FROM ACM_Runs
WHERE EquipID = $equipment
),
ForecastWindow AS (
SELECT Timestamp
FROM ACM_HealthForecast_Continuous
WHERE EquipID = $equipment
AND Timestamp >= DATEADD(HOUR, -24, GETDATE())
AND Timestamp <= DATEADD(HOUR, 24, GETDATE())
)
SELECT
ct.Timestamp,
ct.DetectorName,
ct.ContributionPct,
sh.SensorName,
sh.AbsZScore
FROM ACM_ContributionTimeline ct
CROSS APPLY (SELECT RunID FROM LatestRun) lr
LEFT JOIN ACM_SensorHotspots sh
ON sh.RunID = lr.RunID AND sh.EquipID = $equipment
WHERE ct.EquipID = $equipment
AND ct.RunID = lr.RunID
AND ct.Timestamp IN (SELECT Timestamp FROM ForecastWindow)
ORDER BY ct.Timestamp, ct.ContributionPct DESC
Visualization: Stacked area chart showing detector contributions over forecast horizon + table showing top 5 sensor hotspots
Effort: 3-4 hours
Files Modified: grafana_dashboards/asset_health_dashboard.json
Grafana Panel: New panel "RUL Projection with Failure Threshold"
Query:
-- Health forecast with threshold crossing marker
SELECT
Timestamp,
ForecastHealth,
CI_Lower,
CI_Upper,
75.0 AS FailureThreshold
FROM ACM_HealthForecast_Continuous
WHERE EquipID = $equipment
AND Timestamp >= DATEADD(HOUR, -12, GETDATE())
AND Timestamp <= DATEADD(HOUR, 48, GETDATE())
ORDER BY Timestamp
-- Add annotation query for projected failure time
SELECT
RUL_Final_Hours,
DATEADD(HOUR, RUL_Final_Hours, GETDATE()) AS ProjectedFailureTime,
DominantPath,
ConfidenceBand_Hours
FROM ACM_RUL_Summary
WHERE EquipID = $equipment
AND RunID = (SELECT MAX(RunID) FROM ACM_Runs WHERE EquipID = $equipment)
Visualization:
Effort: 4-5 hours
Files Modified: grafana_dashboards/asset_health_dashboard.json
Grafana Panel: New panel "Model Retraining Status"
SQL Table Update: ACM_RunMetadata
ALTER TABLE ACM_RunMetadata ADD
RetrainDecision NVARCHAR(50), -- "FULL_RETRAIN", "INCREMENTAL_UPDATE", "NO_RETRAIN"
RetrainReason NVARCHAR(500),
LastRetrainRunID INT,
ModelAgeInBatches INT,
ForecastQualityRMSE FLOAT
Query:
SELECT
rm.CreatedAt AS Timestamp,
rm.RetrainDecision,
rm.RetrainReason,
rm.ModelAgeInBatches,
rm.ForecastQualityRMSE,
CASE
WHEN rm.RetrainDecision = 'FULL_RETRAIN' THEN 'Retrained'
WHEN rm.ModelAgeInBatches > 10 AND rm.RetrainDecision != 'FULL_RETRAIN' THEN 'Retrain Recommended'
ELSE 'Model Current'
END AS RetrainStatus
FROM ACM_RunMetadata rm
WHERE rm.EquipID = $equipment
AND rm.CreatedAt >= $__timeFrom
AND rm.CreatedAt <= $__timeTo
ORDER BY rm.CreatedAt DESC
Visualization:
Effort: 3-4 hours
Files Modified: core/acm_main.py (log retrain decision to ACM_RunMetadata), grafana_dashboards/asset_health_dashboard.json
SQL Table: ACM_HealthForecast_Continuous
CREATE TABLE ACM_HealthForecast_Continuous (
Timestamp DATETIME2 NOT NULL,
ForecastHealth FLOAT NOT NULL,
CI_Lower FLOAT,
CI_Upper FLOAT,
SourceRunID INT NOT NULL, -- RunID that contributed this forecast point
MergeWeight FLOAT, -- Temporal blending weight (0-1)
EquipID INT NOT NULL,
CreatedAt DATETIME2 DEFAULT GETDATE(),
PRIMARY KEY (EquipID, Timestamp, SourceRunID)
)
CREATE INDEX IX_HealthForecast_TimeRange
ON ACM_HealthForecast_Continuous(EquipID, Timestamp)
Writer Logic in forecasting.py:
def write_continuous_health_forecast(
merged_horizon: pd.DataFrame, # Already has Timestamp, ForecastHealth, CI_Lower, CI_Upper
run_id: int,
equip_id: int,
output_manager: Any,
tables_dir: Path
) -> None:
"""
Write merged forecast horizon to ACM_HealthForecast_Continuous.
Append-only, no deletion of old forecasts (pruning handled separately).
"""
df_write = merged_horizon.copy()
df_write["SourceRunID"] = run_id
df_write["EquipID"] = equip_id
df_write["MergeWeight"] = 1.0 # Default weight, can be refined
output_manager.write_dataframe(
df_write,
tables_dir / "health_forecast_continuous.csv",
sql_table="ACM_HealthForecast_Continuous",
add_created_at=True
)
# Prune forecasts older than 7 days (cleanup)
if output_manager.sql_client:
try:
cur = output_manager.sql_client.cursor()
cutoff_time = datetime.now() - timedelta(days=7)
cur.execute("""
DELETE FROM ACM_HealthForecast_Continuous
WHERE EquipID = ? AND Timestamp < ?
""", (equip_id, cutoff_time))
output_manager.sql_client.conn.commit()
except Exception as e:
Console.warn(f"[FORECAST] Failed to prune old forecasts: {e}")
Effort: 3-4 hours
Files Modified: core/forecasting.py, SQL schema script
| Task ID | Description | Priority | Effort | Files Modified |
|---|---|---|---|---|
| FORECAST-STATE-01 | ForecastState persistence class | 🔴 CRITICAL | 4-6h | model_persistence.py, acm_main.py |
| FORECAST-STATE-02 | Continuous forecasting logic | 🔴 CRITICAL | 10-14h | forecasting.py, acm_main.py |
| FORECAST-STATE-03 | Hazard-based probability smoothing | 🔴 CRITICAL | 6-8h | forecasting.py |
| FORECAST-STATE-04 | Multi-path RUL derivation | 🔴 CRITICAL | 8-10h | enhanced_rul_estimator.py, forecasting.py |
| FORECAST-STATE-05 | Unified failure condition | 🔴 CRITICAL | 4-5h | evaluate_rul_backtest.py, docs/RUL_METHOD.md, dashboard |
| FORECAST-STATE-06 | Defect type display | 🟡 MEDIUM | 3-4h | dashboard JSON |
| FORECAST-STATE-07 | RUL visualization panel | 🔴 CRITICAL | 4-5h | dashboard JSON |
| FORECAST-STATE-08 | Retraining indicator | 🟡 MEDIUM | 3-4h | acm_main.py, dashboard JSON |
| FORECAST-STATE-09 | Continuous table schema | 🔴 CRITICAL | 3-4h | forecasting.py, SQL scripts |
Total Effort: 45-63 hours (6-8 developer days)
Critical Path: STATE-01 → STATE-02 → STATE-03 → STATE-04 → STATE-09 → STATE-07
forecasting.enable_continuous: false)RECENTLY COMPLETED (For Context) Fixed batch data truncation (60% data loss eliminated) Fixed Unicode encoding errors in Heartbeat spinner Comprehensive 46-table analysis (92/100 data quality score) SQL logging infrastructure (SQL-57 complete) All 40 core analytics tables actively populating Forecasting & RUL tables working (1,104+ rows each) Health timeline, sensor hotspots, regime detection all operational